In [1]:
import pyspark
from pyspark.sql import SQLContext
# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
In [2]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())
In [3]:
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("data/nlpdata/raw_classified.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
#Create DataFrame
data_df = sqlContext.createDataFrame(typed_rdd, ["text", "id", "label"])
#data_df.show()
data_df.printSchema()
In [4]:
data_df.show(4)
In [5]:
# predict language and filter out those with less than 90% chance of being English
lang_df = data_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")
In [6]:
en_df.printSchema()
In [7]:
en_df.show(4)
In [8]:
# remove stop words to reduce dimensionality
rm_stops_df = en_df.withColumn("stop_text", remove_stops_udf(en_df["text"]))
In [9]:
rm_stops_df.printSchema()
In [10]:
rm_stops_df.show(4)
In [11]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df = rm_stops_df.withColumn("feat_text", \
remove_features_udf(rm_stops_df["stop_text"]))
In [12]:
rm_features_df.printSchema()
In [13]:
rm_features_df.show(4)
In [14]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df = rm_features_df.withColumn("tagged_text", \
tag_and_remove_udf(rm_features_df.feat_text))
In [15]:
tagged_df.printSchema()
In [16]:
tagged_df.show(4)
In [17]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))
In [18]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))
In [19]:
# remove all rows containing only blank spaces
check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")
no_blanks_df.printSchema()
In [20]:
# rename columns
no_blanks_df = no_blanks_df.withColumn("text",no_blanks_df.lemm_text)
In [21]:
# dedupe important since alot of the tweets only differed by url's and RT mentions
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])
In [22]:
# select only the columns we care about
data_set = dedup_df.select('id', 'text','label')
In [23]:
data_set.show(4)
In [24]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data_set.randomSplit([0.6, 0.4])
In [25]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
In [26]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(minDocFreq=3, inputCol="features", outputCol="idf")
In [27]:
#
nb = NaiveBayes()
In [28]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])
In [29]:
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
In [30]:
# Make predictions.
predictions = model.transform(testData)
In [31]:
# Select example rows to display.
predictions.select("text", "label", "prediction").show(5)
In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
Out[32]:
In [33]:
#paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()
# paramGrid = ParamGridBuilder().addGrid(rf.maxDepth,[4,8,10]).\
# addGrid(rf.impurity, ['entropy','gini']).build()
# cv = CrossValidator(estimator=pipeline,
# estimatorParamMaps=paramGrid,
# evaluator=MulticlassClassificationEvaluator(),
# numFolds=4)
# #training_df.show(5)
# cvModel = cv.fit(training_df)
In [34]:
#prediction = cvModel.transform(test_df)